package ins.framework.kafka.assignpartion;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.*;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.network.BlockingChannel;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;

public class OffsetUtil {
    public static  BlockingChannel  createChannel(String host){
        String print = Thread.currentThread().getName();
        BlockingChannel channel = null;
        if(channel == null){
            print = print + "创建连接offset manager的 channel";
            channel = new BlockingChannel(host, 9092,
                    BlockingChannel.UseDefaultBufferSize(),
                    BlockingChannel.UseDefaultBufferSize(),
                    5000 /* read timeout in millis */);
            channel.connect();
            //channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        }else{
            print = print + "已创建";
        }
        System.out.println(print);
        int correlationId = 0;
        return channel;
    }

    // 获取消费位置
    public static long offsetFetch(BlockingChannel channel,String clientId, String group, List<TopicAndPartition> partitions, int correlationId){
        long offset = -1;
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
                group,
                partitions,
                (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
                correlationId,
                clientId);
        try {
            if(channel.isConnected()){
                System.out.println("channel connected");
            }
            long temp = channel.send(fetchRequest.underlying());
            ByteBuffer byteBuffer = channel.receive().payload();
            OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(byteBuffer);
            OffsetMetadataAndError result = fetchResponse.offsets().get(partitions.get(0));
            short offsetFetchErrorCode = result.error().code();
            if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
                channel.disconnect();
                System.out.println("获取失败"+result.error().message());
                // Go to step 1 and retry the offset fetch
            } else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
                // retry the offset fetch (after backoff)
                System.out.println("获取失败_正在将消费信息存入内存中"+result.error().message());
            } else {
                offset = result.offset();
                String retrievedMetadata = result.metadata();

            }
        }
        catch (Exception e) {
            e.printStackTrace();
            channel.disconnect();
            // Go to step 1 and then retry offset fetch after backoff
            System.out.println("channel 已关闭"+e.getMessage());
        }
        return offset;
    }

    //提交消费位置
    public static void commitOffset(BlockingChannel channel,String clientId, String group,int correlationId,Map<TopicAndPartition, OffsetAndMetadata> offsets){
        long now = System.currentTimeMillis();
        OffsetCommitRequest commitRequest = new OffsetCommitRequest(
                group,
                offsets,
                correlationId++,
                clientId,
                (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
        try {
            channel.send(commitRequest.underlying());
            OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().payload());
            if (commitResponse.hasError()) {
                for (Errors errors: commitResponse.errors().values()) {
                    short partitionErrorCode = errors.code();
                    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
                        // You must reduce the size of the metadata if you wish to retry
                        System.out.println("offset commit error_offset size too large");
                    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                        channel.disconnect();
                        System.out.println("offset commit error_NotCoordinatorForConsumer");

                        // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
                    } else {
                        // log and retry the commit
                        System.out.println("offset commit error"+errors.message());
                    }
                }
            }
        }
        catch (Exception ioe) {
            channel.disconnect();
            // Go to step 1 and then retry the commit
            System.out.println("offset commit error_NotChannel");
        }
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
                new HashMap<>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: "
                    + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
    public static int getPartitonNum(String key,int numPartitions){
        //Utils.murmur2(key.getBytes())
        return Math.abs(key.hashCode()) % numPartitions;
    }



    public static void main(String[] args) {
        String group = "24",broker = "10.10.1.7";
        String clientId = "console-consumer-59377";
        String topicName = "postgres.test0814";
        int parTitionNum = 1,correlationId=2;
        System.out.println(getOffsetPartition(group));
        final TopicAndPartition testPartition0 = new TopicAndPartition(topicName, parTitionNum);
        List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
        partitions.add(testPartition0);


        Map<TopicAndPartition, OffsetAndMetadata> offsets = new HashMap<>();
        OffsetMetadata offsetMetadata = new OffsetMetadata(10, "s_s0");
        OffsetAndMetadata offsetAndMetadata =
                new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis()
                        + 2*60*60*1000);
        offsets.put(testPartition0, offsetAndMetadata);
        BlockingChannel channel = createChannel(broker);
        commitOffset(channel,clientId,group,correlationId,offsets);

      //  SimpleConsumer consumer = new SimpleConsumer(broker,9092, 100000, 64 * 1024, clientId);
      //  long offset =  getLastOffset(consumer,topicName,parTitionNum,kafka.api.OffsetRequest.EarliestTime(),clientId);

    //   long offset = offsetFetch(channel,clientId,group,partitions,3);
        //System.out.println("当前分区消费位置"+offset);
    }

    public  static int getOffsetPartition(String groupId){
        return Math.abs(groupId.hashCode()%50);
    }

}
